package com.lwq.fast.log.fastlogserver.rocketmq.listener;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.lwq.fast.log.fastlogcore.constant.Constants;
import com.lwq.fast.log.fastlogcore.entity.Message;
import com.lwq.fast.log.fastlogserver.es.service.ElasticSearchService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;


/**
 * 消息监听器，监听rocketmq消息
 * @author 刘文强
 */
@Configuration
@ConditionalOnProperty(name = "active.client", havingValue = "rocketmq")
@Slf4j
public class RocketMqListener {

    @Value("${rocketmq.config.namesrv_addr}")
    private String namesrvAddr;
    @Value("${rocketmq.config.pull_batch_size}")
    private int pullBatchSize;
    @Value("${rocketmq.config.consume_message_batch_max_size}")
    private int consumeMessageBatchMaxSize;
    @Value("${rocketmq.config.pull_threshold_for_queue}")
    private int pullThresholdForQueue;
    @Value("${rocketmq.config.consume_thread_min}")
    private int consumeThreadMin;
    @Value("${rocketmq.config.consume_thread_max}")
    private int consumeThreadMax;

    @Autowired
    private ElasticSearchService elasticSearchService;


    /**
     * 消费者
     * @return
     */
    @Bean(destroyMethod = "shutdown")
    public DefaultMQPushConsumer onMessage(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constants.ROCKETMQ_CONSUMER_GROUP);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setPullBatchSize(pullBatchSize);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setPullThresholdForQueue(pullThresholdForQueue);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        try {
            consumer.subscribe(Constants.ROCKETMQ_MESSAGE_TOPIC, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    List<String> msgList = new ArrayList<>();
                    msgs.forEach(mqMsg ->{
                        String messageContent = getMessageContent(mqMsg);
                        msgList.add(messageContent);
                    });
                    batchInsert(msgList);
                    msgList.clear();
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            consumer.start();
        } catch (Exception e) {
            log.error("MQ 消费端启动失败",e);
        }
        return consumer;
    }

    private void batchInsert(List<String> msgList) {
        List<Message> messageList = new ArrayList<>();
        for (String msg : msgList) {
            try{
                Message message = JSON.parseObject(msg, Message.class);
                if (ObjectUtil.isNotNull(message)){
                    messageList.add(message);
                }
            }catch (Exception e){
                // 序列化出错，静默处理
            }
        }
        if (CollectionUtil.isNotEmpty(messageList)){
            try{
                elasticSearchService.logMessage2EsBatch(messageList);
            }catch (Exception e){
                log.error("批量写入数据到Es出错",e);
            }
        }
    }


    /**
     * 获取消息
     *
     * @param msg
     * @return
     */
    public String getMessageContent(MessageExt msg) {
        byte[] body = msg.getBody();
        String content = "";
        try {
            content = new String(body, RemotingHelper.DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return content;
    }
}
